1   package org.apache.lucene.replicator;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.IOException;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.Map;
24  import java.util.concurrent.Callable;
25  
26  import org.apache.lucene.index.DirectoryReader;
27  import org.apache.lucene.index.IndexCommit;
28  import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
29  import org.apache.lucene.store.Directory;
30  import org.apache.lucene.store.IOContext;
31  import org.apache.lucene.util.InfoStream;
32  
33  /**
34   * A {@link ReplicationHandler} for replication of an index and taxonomy pair.
35   * See {@link IndexReplicationHandler} for more detail. This handler ensures
36   * that the search and taxonomy indexes are replicated in a consistent way.
37   * <p>
38   * <b>NOTE:</b> if you intend to recreate a taxonomy index, you should make sure
39   * to reopen an IndexSearcher and TaxonomyReader pair via the provided callback,
40   * to guarantee that both indexes are in sync. This handler does not prevent
41   * replicating such index and taxonomy pairs, and if they are reopened by a
42   * different thread, unexpected errors can occur, as well as inconsistency
43   * between the taxonomy and index readers.
44   * 
45   * @see IndexReplicationHandler
46   * 
47   * @lucene.experimental
48   */
49  public class IndexAndTaxonomyReplicationHandler implements ReplicationHandler {
50    
51    /**
52     * The component used to log messages to the {@link InfoStream#getDefault()
53     * default} {@link InfoStream}.
54     */
55    public static final String INFO_STREAM_COMPONENT = "IndexAndTaxonomyReplicationHandler";
56  
57    private final Directory indexDir;
58    private final Directory taxoDir;
59    private final Callable<Boolean> callback;
60    
61    private volatile Map<String,List<RevisionFile>> currentRevisionFiles;
62    private volatile String currentVersion;
63    private volatile InfoStream infoStream = InfoStream.getDefault();
64  
65    /**
66     * Constructor with the given index directory and callback to notify when the
67     * indexes were updated.
68     */
69    public IndexAndTaxonomyReplicationHandler(Directory indexDir, Directory taxoDir, Callable<Boolean> callback)
70        throws IOException {
71      this.callback = callback;
72      this.indexDir = indexDir;
73      this.taxoDir = taxoDir;
74      currentRevisionFiles = null;
75      currentVersion = null;
76      final boolean indexExists = DirectoryReader.indexExists(indexDir);
77      final boolean taxoExists = DirectoryReader.indexExists(taxoDir);
78      if (indexExists != taxoExists) {
79        throw new IllegalStateException("search and taxonomy indexes must either both exist or not: index=" + indexExists
80            + " taxo=" + taxoExists);
81      }
82      if (indexExists) { // both indexes exist
83        final IndexCommit indexCommit = IndexReplicationHandler.getLastCommit(indexDir);
84        final IndexCommit taxoCommit = IndexReplicationHandler.getLastCommit(taxoDir);
85        currentRevisionFiles = IndexAndTaxonomyRevision.revisionFiles(indexCommit, taxoCommit);
86        currentVersion = IndexAndTaxonomyRevision.revisionVersion(indexCommit, taxoCommit);
87        final InfoStream infoStream = InfoStream.getDefault();
88        if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
89          infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
90              + " currentRevisionFiles=" + currentRevisionFiles);
91          infoStream.message(INFO_STREAM_COMPONENT, "constructor(): indexCommit=" + indexCommit
92              + " taxoCommit=" + taxoCommit);
93        }
94      }
95    }
96    
97    @Override
98    public String currentVersion() {
99      return currentVersion;
100   }
101   
102   @Override
103   public Map<String,List<RevisionFile>> currentRevisionFiles() {
104     return currentRevisionFiles;
105   }
106   
107   @Override
108   public void revisionReady(String version, Map<String,List<RevisionFile>> revisionFiles,
109       Map<String,List<String>> copiedFiles, Map<String,Directory> sourceDirectory) throws IOException {
110     Directory taxoClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE);
111     Directory indexClientDir = sourceDirectory.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
112     List<String> taxoFiles = copiedFiles.get(IndexAndTaxonomyRevision.TAXONOMY_SOURCE);
113     List<String> indexFiles = copiedFiles.get(IndexAndTaxonomyRevision.INDEX_SOURCE);
114     String taxoSegmentsFile = IndexReplicationHandler.getSegmentsFile(taxoFiles, true);
115     String indexSegmentsFile = IndexReplicationHandler.getSegmentsFile(indexFiles, false);
116     String taxoPendingFile = taxoSegmentsFile == null ? null : "pending_" + taxoSegmentsFile;
117     String indexPendingFile = "pending_" + indexSegmentsFile;
118     
119     boolean success = false;
120     try {
121       // copy taxonomy files before index files
122       IndexReplicationHandler.copyFiles(taxoClientDir, taxoDir, taxoFiles);
123       IndexReplicationHandler.copyFiles(indexClientDir, indexDir, indexFiles);
124 
125       // fsync all copied files (except segmentsFile)
126       if (!taxoFiles.isEmpty()) {
127         taxoDir.sync(taxoFiles);
128       }
129       indexDir.sync(indexFiles);
130       
131       // now copy, fsync, and rename segmentsFile, taxonomy first because it is ok if a
132       // reader sees a more advanced taxonomy than the index.
133       
134       if (taxoSegmentsFile != null) {
135         taxoDir.copyFrom(taxoClientDir, taxoSegmentsFile, taxoPendingFile, IOContext.READONCE);
136       }
137       indexDir.copyFrom(indexClientDir, indexSegmentsFile, indexPendingFile, IOContext.READONCE);
138       
139       if (taxoSegmentsFile != null) {
140         taxoDir.sync(Collections.singletonList(taxoPendingFile));
141       }
142       indexDir.sync(Collections.singletonList(indexPendingFile));
143       
144       if (taxoSegmentsFile != null) {
145         taxoDir.renameFile(taxoPendingFile, taxoSegmentsFile);
146       }
147       
148       indexDir.renameFile(indexPendingFile, indexSegmentsFile);
149       
150       success = true;
151     } finally {
152       if (!success) {
153         if (taxoSegmentsFile != null) {
154           taxoFiles.add(taxoSegmentsFile); // add it back so it gets deleted too
155           taxoFiles.add(taxoPendingFile);
156         }
157         IndexReplicationHandler.cleanupFilesOnFailure(taxoDir, taxoFiles);
158         indexFiles.add(indexSegmentsFile); // add it back so it gets deleted too
159         indexFiles.add(indexPendingFile);
160         IndexReplicationHandler.cleanupFilesOnFailure(indexDir, indexFiles);
161       }
162     }
163 
164     // all files have been successfully copied + sync'd. update the handler's state
165     currentRevisionFiles = revisionFiles;
166     currentVersion = version;
167     
168     if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
169       infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
170           + " currentRevisionFiles=" + currentRevisionFiles);
171     }
172     
173     // Cleanup the index directory from old and unused index files.
174     // NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
175     // side-effects, e.g. if it hits sudden IO errors while opening the index
176     // (and can end up deleting the entire index). It is not our job to protect
177     // against those errors, app will probably hit them elsewhere.
178     IndexReplicationHandler.cleanupOldIndexFiles(indexDir, indexSegmentsFile, infoStream);
179     IndexReplicationHandler.cleanupOldIndexFiles(taxoDir, taxoSegmentsFile, infoStream);
180 
181     // successfully updated the index, notify the callback that the index is
182     // ready.
183     if (callback != null) {
184       try {
185         callback.call();
186       } catch (Exception e) {
187         throw new IOException(e);
188       }
189     }
190   }
191 
192   /** Sets the {@link InfoStream} to use for logging messages. */
193   public void setInfoStream(InfoStream infoStream) {
194     if (infoStream == null) {
195       infoStream = InfoStream.NO_OUTPUT;
196     }
197     this.infoStream = infoStream;
198   }
199   
200 }